tutorials/014 - Schema Evolution.ipynb (482 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 14 - Schema Evolution\n", "\n", "awswrangler supports new **columns** on Parquet and CSV datasets through:\n", "\n", "- [wr.s3.to_parquet()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet)\n", "- [wr.s3.store_parquet_metadata()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.store_parquet_metadata.html#awswrangler.s3.store_parquet_metadata) i.e. \"Crawler\"\n", "- [wr.s3.to_csv()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.to_csv.html#awswrangler.s3.to_csv)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from datetime import date\n", "\n", "import pandas as pd\n", "\n", "import awswrangler as wr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " ···········································\n" ] } ], "source": [ "import getpass\n", "\n", "bucket = getpass.getpass()\n", "path = f\"s3://{bucket}/dataset/\"" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Creating the Dataset\n", "### Parquet Create" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>value</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>1</td>\n", " <td>foo</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>2</td>\n", " <td>boo</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " id value\n", "0 1 foo\n", "1 2 boo" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame(\n", " {\n", " \"id\": [1, 2],\n", " \"value\": [\"foo\", \"boo\"],\n", " }\n", ")\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"aws_sdk_pandas\", table=\"my_table\")\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### CSV Create" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "df = pd.DataFrame(\n", " {\n", " \"id\": [1, 2],\n", " \"value\": [\"foo\", \"boo\"],\n", " }\n", ")\n", "\n", "wr.s3.to_csv(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"aws_sdk_pandas\", table=\"my_table\")\n", "\n", "wr.s3.read_csv(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Schema Version 0 on Glue Catalog (AWS Console)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Glue Console](_static/glue_catalog_version_0.png \"Glue Console\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Appending with NEW COLUMNS\n", "### Parquet Append" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>value</th>\n", " <th>date</th>\n", " <th>flag</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>3</td>\n", " <td>bar</td>\n", " <td>2020-01-03</td>\n", " <td>True</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>4</td>\n", " <td>None</td>\n", " <td>2020-01-04</td>\n", " <td>False</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>1</td>\n", " <td>foo</td>\n", " <td>NaN</td>\n", " <td>NaN</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>2</td>\n", " <td>boo</td>\n", " <td>NaN</td>\n", " <td>NaN</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " id value date flag\n", "0 3 bar 2020-01-03 True\n", "1 4 None 2020-01-04 False\n", "2 1 foo NaN NaN\n", "3 2 boo NaN NaN" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame(\n", " {\"id\": [3, 4], \"value\": [\"bar\", None], \"date\": [date(2020, 1, 3), date(2020, 1, 4)], \"flag\": [True, False]}\n", ")\n", "\n", "wr.s3.to_parquet(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"append\",\n", " database=\"aws_sdk_pandas\",\n", " table=\"my_table\",\n", " catalog_versioning=True, # Optional\n", ")\n", "\n", "wr.s3.read_parquet(path, dataset=True, validate_schema=False)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### CSV Append\n", "\n", "Note: for CSV datasets due to [column ordering](https://docs.aws.amazon.com/athena/latest/ug/types-of-updates.html#updates-add-columns-beginning-middle-of-table), by default, schema evolution is disabled. Enable it by passing `schema_evolution=True` flag" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "df = pd.DataFrame(\n", " {\"id\": [3, 4], \"value\": [\"bar\", None], \"date\": [date(2020, 1, 3), date(2020, 1, 4)], \"flag\": [True, False]}\n", ")\n", "\n", "wr.s3.to_csv(\n", " df=df,\n", " path=path,\n", " dataset=True,\n", " mode=\"append\",\n", " database=\"aws_sdk_pandas\",\n", " table=\"my_table\",\n", " schema_evolution=True,\n", " catalog_versioning=True, # Optional\n", ")\n", "\n", "wr.s3.read_csv(path, dataset=True, validate_schema=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Schema Version 1 on Glue Catalog (AWS Console)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Glue Console](_static/glue_catalog_version_1.png \"Glue Console\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading from Athena" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>value</th>\n", " <th>date</th>\n", " <th>flag</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>3</td>\n", " <td>bar</td>\n", " <td>2020-01-03</td>\n", " <td>True</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>4</td>\n", " <td>None</td>\n", " <td>2020-01-04</td>\n", " <td>False</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>1</td>\n", " <td>foo</td>\n", " <td>None</td>\n", " <td>&lt;NA&gt;</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>2</td>\n", " <td>boo</td>\n", " <td>None</td>\n", " <td>&lt;NA&gt;</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " id value date flag\n", "0 3 bar 2020-01-03 True\n", "1 4 None 2020-01-04 False\n", "2 1 foo None <NA>\n", "3 2 boo None <NA>" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.athena.read_sql_table(table=\"my_table\", database=\"aws_sdk_pandas\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleaning Up" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.s3.delete_objects(path)\n", "wr.catalog.delete_table_if_exists(table=\"my_table\", database=\"aws_sdk_pandas\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" } }, "nbformat": 4, "nbformat_minor": 4 }